{
try
{
filestruct = getNextKey(filestruct);
if(filestruct == null)
{
continue;
}
fs = pq.poll();
}
if (fs != null
&& (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
{
// The keys are the same so we need to add this to the
// ldfs list
lastkey = fs.getKey();
lfs.add(fs);
}
else
{
Collections.sort(lfs, new FileStructComparator());
ColumnFamily columnFamily = null;
bufOut.reset();
if(lfs.size() > 1)
{
for (FileStruct filestruct : lfs)
{
try
{
/* read the length although we don't need it */
filestruct.getBufIn().readInt();
// Skip the Index
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
merge(columnFamilies);
}
// deserialize into column families
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
catch ( Exception ex)
{
logger_.warn("error in filecompaction", ex);
}
}
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
if( columnFamily != null )
{
/* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
}
}
else
{
FileStruct filestruct = lfs.get(0);
try
{
/* read the length although we don't need it */
int size = filestruct.getBufIn().readInt();
bufOut.write(filestruct.getBufIn(), size);
}
catch ( Exception ex)
{
ex.printStackTrace();
filestruct.close();
continue;
}
}
if ( ssTable == null )
{
ssTable = new SSTable(compactionFileLocation, mergedFileName);
}
ssTable.append(lastkey, bufOut);
/* Fill the bloom filter with the key */
doFill(compactedBloomFilter, lastkey);
totalkeysWritten++;
for (FileStruct filestruct : lfs)
{
try
{
filestruct.getNextKey();
if (filestruct.isExhausted())
{
continue;